package com.hzmg.akka.cluster;

import akka.actor.*;
import akka.cluster.Cluster;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.cluster.routing.ClusterRouterGroup;
import akka.cluster.routing.ClusterRouterGroupSettings;
import akka.routing.RoundRobinGroup;
import akka.routing.RoundRobinPool;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.hzmg.akka.utils.NetUtils;
import com.hzmg.common.PowerSerializable;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.*;

@Slf4j
@Component
public class ActorSystemClusterDemo1 {
    @Getter
    private static String actorSystemAddress;

    public static ActorSystem createActorSystem(String port, String name) {
        //加载resources里的配置文件，里面包含AcotrSystem创建所需参数，
        //设置工作端口，序列化方式，日志级别
        Map<String, Object> overrideConfig = Maps.newHashMap();
        String localIP = NetUtils.getLocalHost();
        overrideConfig.put("akka.remote.artery.canonical.hostname", localIP);
        overrideConfig.put("akka.remote.artery.canonical.port", port);
        //设置权限
        Set<String> useRoles = new HashSet<>(Collections.singletonList(name));
        overrideConfig.put("akka.cluster.roles", useRoles);
        actorSystemAddress = localIP + ":" + port;
        log.info("[AkkaDemo] akka-remote server address: {}", actorSystemAddress);
        Config akkaBasicConfig = ConfigFactory.load("hzmg-cluster.akka.conf");
        Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);

        //创建Actor工厂
        ActorSystem actorSystem = ActorSystem.create(name, akkaFinalConfig);
        //地址信息

        ActorRef router = actorSystem.actorOf(Props.create(Worker.class, "worker" + port).withRouter(new RoundRobinPool(5)), "worker");
        ActorRef mediator = DistributedPubSub.get(actorSystem).mediator();
        // subscribe to the topic named "content"
        mediator.tell(new DistributedPubSubMediator.Subscribe("content", router),router);
        //ActorSelection actorSelection=actorSystem.actorSelection("/user/worker");
        //actorSelection.tell(new Work("test"),ActorRef.noSender());
        log.info(router.path().toString());
        //创建集群：
        Cluster cluster = Cluster.get(actorSystem);
        log.info("cluster.selfAddress:" + cluster.selfAddress().toString());

        //加入种子节点
        List<Address> list = new LinkedList<>(); //replace this with your method to dynamically get seed nodes
        list.add(new Address("akka", actorSystem.name(), localIP, 1001));
        cluster.joinSeedNodes(list);

        return actorSystem;
    }

    public static void main(String[] args) {
        //createActorSystem("1001");
        //createActorSystem("1002");
        ActorSystem actorSystem3 = createActorSystem("1003", "akka-server");
        Iterable<String> routeesPaths = Lists.newArrayList("/user/worker");
        boolean allowLocalRoutees = true;
        Set<String> useRoles = new HashSet<>(Collections.singletonList("akka-server"));
        //路由总数
        int totalInstances = 100;
        ActorRef workerRouter =
                actorSystem3
                        .actorOf(
                                new ClusterRouterGroup(
                                        new RoundRobinGroup(routeesPaths),
                                        new ClusterRouterGroupSettings(
                                                totalInstances, routeesPaths, allowLocalRoutees, useRoles))
                                        .props(),
                                "workerRouter");
        // somewhere else
        ActorRef publisher = actorSystem3.actorOf(Props.create(Publisher.class), "publisher");

        //log.info(String.valueOf(workerRouter.path()));
/*        String actorPath = "akka://akka-server@192.168.112.1:1002/user/worker";
        ActorSelection actorSelection = actorSystem3.actorSelection(actorPath);
        actorSelection.tell(new Work("hello11"), ActorRef.noSender());*/
        //workerRouter.tell(new Work("hello11"),workerRouter);
        //获取所在集群
        Cluster cluster = Cluster.get(actorSystem3);
        //将请求操作延迟到获取到自己的Manberup信息为止
    /*    cluster.registerOnMemberUp(
                () -> {

                    for (int i = 0; i < 5; i++) {
                        CompletionStage ask = Patterns.ask(workerRouter, new Work("hello:" + i), Duration.ofMillis(5000));
                        //.tell(new Work("hello:"+i),ref);
                        try {
                            String result = (String) ask.toCompletableFuture().get(5000, TimeUnit.MILLISECONDS);
                            //log.info(result);
                        } catch (Exception e) {
                            throw new AkkaCloudException(e);
                        }
                        // after a while the subscriptions are replicated

                    }

                }
        );*/
        cluster.registerOnMemberUp(
                () -> {
                    for (int i = 0; i < 10; i++) {
                        // after a while the subscriptions are replicated
                        publisher.tell("hello", null);
                    }
                }
        );

    }

    public static class Worker extends AbstractActor {
        private final String name;

        Worker(String name) {
            this.name = name;

        }

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(Work.class, message -> {
                        //通过自己来当做标识
                        log.info("I am node{} my name is {},I accept {}", name, getSelf(), message.payload);
                        getSender().tell("I am " + name + " and I receive your msg!", getSelf());
                    }).match(DistributedPubSubMediator.SubscribeAck.class, msg -> {
                    })
                    .match(String.class, msg -> log.info("I am node {} my name is {},I accept {}", name, getSelf(), msg))
                    .build();
        }

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    static class Work implements PowerSerializable {
        public String payload;


    }

    static class Publisher extends AbstractActor {
        // activate the extension
        ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();

        @Override
        public Receive createReceive() {
            return receiveBuilder()
                    .match(
                            String.class,
                            in -> { log.info("I am publisher,I start to publish!");
                                String out = in.toUpperCase();
                                mediator.tell(new DistributedPubSubMediator.Publish("content", out), getSelf());
                            })
                    .build();
        }
    }
}
